package com.corn.flink.lesson6;

import com.corn.flink.lesson4.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;

/**
 * @author JimWu
 * @date 2023/4/17 15:34
 **/
public class UnionOperatorDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<Event> ds1 = env.fromElements(
                new Event("add", 1000),
                new Event("delete", 2000),
                new Event("update", 3000),
                new Event("select", 5000),
                new Event("add", 6000),
                new Event("delete", 7000),
                new Event("select", 8000),
                new Event("select", 9000),
                new Event("select", 10000),
                new Event("select", 11000)
        ).assignTimestampsAndWatermarks(
                WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.timestamp;
                    }
                })
        );

        DataStream<Event> ds2 = env.fromElements(
                new Event("select", 1000),
                new Event("select", 3000),
                new Event("select", 2000),
                new Event("select", 4000),
                new Event("select", 5000)
        ).assignTimestampsAndWatermarks(
                WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
        );

        DataStream<Event> unionDs = ds1.union(ds2);
        unionDs.print("unionDs");
        env.execute();
    }
}
